package com.ym.excel.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

@Configuration
@EnableConfigurationProperties
public class RabbitMqConfig {

    protected static Logger logger= LoggerFactory.getLogger("消息中间配置");
    //发送方确认
    private boolean publisherConfirms = true;
    //  设置连接工厂配置
    @Autowired
    private Environment environment;

    @Value("${yangmin.rabbitmq.host}")
    private String host;

    @Value("${yangmin.rabbitmq.port}")
    private int port;

    @Value("${yangmin.rabbitmq.username}")
    private String username;

    @Value("${yangmin.rabbitmq.password}")
    private String password;


    @Bean
    public ConnectionFactory connectionFactory() {
        logger.info("正在获取消息中间配置信息");
        /* 将从数据库获得到的配置信息 注入到连接工厂 */
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(host+":"+port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        /** 如果要进行消息回调，则这里必须要设置为true */
        connectionFactory.setPublisherConfirms(publisherConfirms);
        return connectionFactory;
    }
    // 配置rabbit总管理
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        return new RabbitAdmin(connectionFactory);
    }
    @Bean
    public RabbitTemplate newRabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        /**
         * 当mandatory标志位设置为true时 失败者通知
         * 如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息
         * 那么broker会调用basic.return方法将消息返还给生产者
         * 当mandatory设置为false时，出现上述情况broker会直接将消息丢弃
         */
        template.setMandatory(true);
        //===============生产者发送确认==========
        template.setConfirmCallback(confirmCallback());
        // ReturnCallback接口用于实现消息发送到RabbitMQ交换器，但无相应队列与交换器绑定时的回调
        template.setReturnCallback(returnCallback());
        return template;
    }


    //===============生产者发送确认==========
    @Bean
    public RabbitTemplate.ConfirmCallback confirmCallback(){
        return new RabbitTemplate.ConfirmCallback(){

            @Override
            public void confirm(CorrelationData correlationData,
                                boolean ack, String cause) {
                if (ack) {
                    logger.info("发送者确认发送给mq成功");
                } else {
                    //处理失败的消息
                    logger.info("发送者发送给mq失败,考虑重发:"+cause);
                }
            }
        };
    }
    // ReturnCallback接口用于实现消息发送到RabbitMQ交换器，但无相应队列与交换器绑定时的回调
    @Bean
    public RabbitTemplate.ReturnCallback returnCallback(){
        return new RabbitTemplate.ReturnCallback(){

            @Override
            public void returnedMessage(Message message,
                                        int replyCode,
                                        String replyText,
                                        String exchange,
                                        String routingKey) {
                logger.info("Returned replyText："+replyText);
                logger.info("Returned exchange："+exchange);
                logger.info("Returned routingKey："+routingKey);
                String msgJson  = new String(message.getBody());
                logger.info("Returned Message："+msgJson);
            }
        };
    }

}
